package com.iteaj.iot.test.server.breaker;

import cn.hutool.core.util.RandomUtil;
import com.iteaj.iot.client.mqtt.gateway.MqttGatewayConnectProperties;
import com.iteaj.iot.client.mqtt.gateway.MqttGatewayHandle;
import com.iteaj.iot.client.mqtt.gateway.adapter.MqttGatewayJsonHandle;
import com.iteaj.iot.handle.proxy.ProtocolHandleProxy;
import com.iteaj.iot.redis.handle.RedisListHandle;
import com.iteaj.iot.redis.producer.RedisProducer;
import com.iteaj.iot.server.ServerProtocolHandle;
import com.iteaj.iot.taos.TaosHandle;
import com.iteaj.iot.taos.TaosSqlManager;
import com.iteaj.iot.taos.TaosSqlMeta;
import com.iteaj.iot.test.IotTestProperties;
import com.iteaj.iot.test.TestConst;
import com.iteaj.iot.test.taos.TaosBreakerDataTable;
import com.iteaj.iot.test.taos.TaosBreakerUsingStable;
import com.iteaj.iot.tools.db.DefaultFieldMeta;
import com.iteaj.iot.tools.db.FieldMeta;
import com.iteaj.iot.tools.db.rdbms.RdbmsHandle;
import com.iteaj.iot.tools.db.rdbms.RdbmsMeta;
import com.iteaj.iot.tools.db.rdbms.RdbmsSqlManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import java.sql.Types;
import java.util.*;
import java.util.concurrent.Executor;

@Component
@ConditionalOnExpression("${iot.test.breaker.start:false} and ${iot.test.server:false}")
public class DataAcceptHandle implements ServerProtocolHandle<DataAcceptProtocol>, InitializingBean, RdbmsHandle<DataAcceptProtocol>
        , TaosHandle<DataAcceptProtocol>, RedisListHandle<DataAcceptProtocol, Object>, MqttGatewayJsonHandle<DataAcceptProtocol, Object> {

    @Autowired
    private IotTestProperties properties;
    @Autowired
    private ThreadPoolTaskScheduler taskScheduler;
    @Autowired(required = false)
    private RdbmsSqlManager rdbmsSqlManager;
    @Autowired(required = false)
    private TaosSqlManager taosSqlManager;

    private RdbmsMeta rdbmsMeta;
    private TaosSqlMeta taosSqlMeta;
    private MqttGatewayConnectProperties gatewayConnectProperties;
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public boolean filter(Object entity, Class<? extends ProtocolHandleProxy> proxy) {
        if(TaosHandle.class == proxy && properties.isTaosStart()) {
            return true;
        } else if(MqttGatewayHandle.class == proxy && properties.getMqtt().isStart()) {
            return true;
        } else if(RedisProducer.class == proxy && properties.isRedisStart()) {
            return true;
        } else if(RdbmsHandle.class == proxy && properties.isRdbmsStart()) {
            return true;
        }else {
            return false;
        }
    }

    @Override
    public Integer consumer(List<Object> objects) {
        logger.info("Redis消费测试 消费数量：{}", objects.size());
        return objects.size();
    }

    @Override
    public Object handle(DataAcceptProtocol protocol) {
        final int i = RandomUtil.randomInt(1, 9);
        if(i % 2 == 0) { // 测试自动创建数据表
            TaosBreakerUsingStable entity = new TaosBreakerUsingStable(protocol.getEquipCode());
            entity.setI(protocol.getI());
            entity.setV(protocol.getV());
            entity.setPy(protocol.getPy());
            entity.setSn(protocol.getEquipCode());
            entity.setPower1(protocol.getPower1());
            entity.setPower2(protocol.getPower2());

            if(i == 6 || i == 8) {
                // 使用map值的形式写入数据
                Map<String, Object> valueMap = new HashMap<>();
                valueMap.put("ts", new Date());
                valueMap.put("i", protocol.getI());
                valueMap.put("v", protocol.getV());
                valueMap.put("py", protocol.getPy());
                valueMap.put("sn", protocol.getEquipCode());
                valueMap.put("power1", protocol.getPower1());
                valueMap.put("power2", protocol.getPower2());
                int value = 0;

                if(properties.isTaosStart()) {
                    value = taosSqlManager.insert("meters", valueMap);
                    if(value > 0) {
                        logger.info(TestConst.LOGGER_DATA_COLLECT_DESC, "时序数据库(taos)", "MAP传参", "通过");
                    }
                }

                value = 0;
                if(properties.isRdbmsStart()) {
                    value = rdbmsSqlManager.insert("t_collect_map", valueMap);
                    if(value > 0) {
                        logger.info(TestConst.LOGGER_DATA_COLLECT_DESC, "关系型数据库", "MAP传参", "通过");
                    }
                }

                return value > 0 ? null : entity;
            } else {
                return entity;
            }
        } else { // 测试插入数据表
            TaosBreakerDataTable dataTable = new TaosBreakerDataTable();
            dataTable.setI(protocol.getI());
            dataTable.setV(protocol.getV());
//            dataTable.setPy(protocol.getPy());
            dataTable.setTs(new Date());
            dataTable.setSn(protocol.getEquipCode());
            dataTable.setPower1(protocol.getPower1());
            dataTable.setPower2(protocol.getPower2());

            if((i == 7 || i == 5)) {
                int value = 0;

                // 手动批量插入
                if(properties.isTaosStart()) {
                    value = taosSqlManager.batchInsert(TaosBreakerDataTable.class, Arrays.asList(dataTable, dataTable));
                    if(value > 0) {
                        logger.info(TestConst.LOGGER_DATA_COLLECT_DESC, "时序数据库(taos)", "ENTITY传参", "通过");
                    }
                }

                value = 0;
                if(properties.isRdbmsStart()) {
                    value = rdbmsSqlManager.batchInsert(TaosBreakerDataTable.class, Arrays.asList(dataTable, dataTable));
                    if(value > 0) {
                        logger.info(TestConst.LOGGER_DATA_COLLECT_DESC, "关系型数据库", "ENTITY传参", "通过");
                    }
                }

                return value > 0 ? null : dataTable;
            } else {
                return dataTable;
            }
        }
    }

    @Override
    public String getKey() {
        return "Break_Redis_test";
    }

    @Override
    public MqttGatewayConnectProperties getProperties(Object entity) {
        return this.gatewayConnectProperties;
    }

    @Override
    public Executor executor(Object entity, Class<? extends ProtocolHandleProxy> proxy) {
        if(proxy == MqttGatewayHandle.class) {
            return taskScheduler;
        } else {
            return null;
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        IotTestProperties.TestMqttConnectProperties mqtt = properties.getMqtt();
        if(mqtt != null) {
            this.gatewayConnectProperties = new MqttGatewayConnectProperties(mqtt.getHost()
                    , mqtt.getPort(), "MqttGatewayTCI", "/breaker/gateway");
        }

        List<FieldMeta> fieldMetas = new ArrayList<>();
        fieldMetas.add(new DefaultFieldMeta(Types.DOUBLE, "i"));
        fieldMetas.add(new DefaultFieldMeta(Types.DECIMAL, "v"));
        fieldMetas.add(new DefaultFieldMeta(Types.TIMESTAMP, "ts"));
        fieldMetas.add(new DefaultFieldMeta(Types.FLOAT, "power1"));
        fieldMetas.add(new DefaultFieldMeta(Types.FLOAT, "power2"));
        fieldMetas.add(new DefaultFieldMeta(Types.DOUBLE, "py"));

        if(rdbmsSqlManager != null) {
            this.rdbmsMeta = new RdbmsMeta("t_collect_map", fieldMetas);
            this.rdbmsSqlManager.register(this.rdbmsMeta);
        }

        if(taosSqlManager != null) {
            this.taosSqlMeta = new TaosSqlMeta("meters", "'t_'+#root.sn", fieldMetas);
            this.taosSqlManager.register(this.taosSqlMeta);
        }
    }
}
